/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

//   $Id: altertableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $

/** @file */

#include <unistd.h>
#include <typeinfo>
#include <regex>
#include <string>
#include <vector>
using namespace std;

#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string/case_conv.hpp>

#include "altertableprocessor.h"

#include "brm.h"
using namespace BRM;

#include "calpontsystemcatalog.h"
using namespace execplan;

#include "ddlpkg.h"
using namespace ddlpackage;

#include "sqllogger.h"
#include "messagelog.h"
using namespace logging;

#include "mariadb_my_sys.h"

#include "we_messages.h"
#include "we_ddlcommandclient.h"
#include "we_ddlcommon.h"
using namespace WriteEngine;

#include "oamcache.h"
using namespace oam;

#include "bytestream.h"
using namespace messageqcpp;

#include "cacheutils.h"
using namespace cacheutils;

#include "IDBDataFile.h"
#include "IDBPolicy.h"
using namespace idbdatafile;

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
// for warnings on typeid :expression with side effects will be evaluated despite being used as an operand to
// 'typeid'
#endif

// TODO: this should be in a common header somewhere
struct extentInfo
{
  uint16_t dbRoot;
  uint32_t partition;
  uint16_t segment;
  bool operator==(const extentInfo& rhs) const
  {
    return (dbRoot == rhs.dbRoot && partition == rhs.partition && segment == rhs.segment);
  }
  bool operator!=(const extentInfo& rhs) const
  {
    return !(*this == rhs);
  }
};

namespace
{

bool checkTextTypesLengthAreEqual(const CalpontSystemCatalog::ColType& colType, const ColumnType& newType)
{
  auto newTypeCharset = get_charset(newType.fCharsetNum, MYF(MY_WME));
  auto bytesInChar = newTypeCharset ? newTypeCharset->mbmaxlen : colType.getCharset()->mbmaxlen;
  return colType.colWidth == newType.fLength * bytesInChar;
}

bool typesAreSame(const CalpontSystemCatalog::ColType& colType, const ColumnType& newType)
{
  switch (colType.colDataType)
  {
    case (CalpontSystemCatalog::BIT):
      if (newType.fType == DDL_BIT)
        return true;

      break;

    case (CalpontSystemCatalog::TINYINT):
      if (newType.fType == DDL_TINYINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      //@Bug 5443 Not allow user change data type.
      // Not sure this is possible...
      // if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision &&
      //	colType.scale == newType.fScale) return true;
      break;

    case (CalpontSystemCatalog::UTINYINT):
      if (newType.fType == DDL_UNSIGNED_TINYINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::CHAR):
      if (newType.fType == DDL_CHAR && checkTextTypesLengthAreEqual(colType, newType))
        return true;

      break;

    case (CalpontSystemCatalog::SMALLINT):
      if (newType.fType == DDL_SMALLINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      // if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision &&
      //	colType.scale == newType.fScale) return true;
      break;

    case (CalpontSystemCatalog::USMALLINT):
      if (newType.fType == DDL_UNSIGNED_SMALLINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::DECIMAL):
      if ((newType.fType == DDL_DECIMAL || newType.fType == DDL_NUMERIC) &&
          colType.precision == newType.fPrecision && colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::UDECIMAL):
      if ((newType.fType == DDL_UNSIGNED_DECIMAL || newType.fType == DDL_UNSIGNED_NUMERIC) &&
          colType.precision == newType.fPrecision && colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::MEDINT):
      if (newType.fType == DDL_MEDINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      //@Bug 5443 Not allow user change data type.
      // if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision &&
      //	colType.scale == newType.fScale) return true;
      break;

    case (CalpontSystemCatalog::UMEDINT):
      if (newType.fType == DDL_UNSIGNED_MEDINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::INT):
      if (newType.fType == DDL_INT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      // if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision &&
      //	colType.scale == newType.fScale) return true;
      break;

    case (CalpontSystemCatalog::UINT):
      if (newType.fType == DDL_UNSIGNED_INT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::FLOAT):
      if (newType.fType == DDL_FLOAT)
        return true;

      break;

    case (CalpontSystemCatalog::UFLOAT):
      if (newType.fType == DDL_UNSIGNED_FLOAT)
        return true;

      break;

    case (CalpontSystemCatalog::DATE):
      if (newType.fType == DDL_DATE)
        return true;

      break;

    case (CalpontSystemCatalog::BIGINT):
      if (newType.fType == DDL_BIGINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      //@Bug 5443 Not allow user change data type.
      // decimal is mapped to bigint in syscat
      // if (newType.fType == DDL_DECIMAL && colType.precision == newType.fPrecision &&
      //	colType.scale == newType.fScale) return true;
      break;

    case (CalpontSystemCatalog::UBIGINT):
      if (newType.fType == DDL_UNSIGNED_BIGINT && colType.precision == newType.fPrecision &&
          colType.scale == newType.fScale)
        return true;

      break;

    case (CalpontSystemCatalog::DOUBLE):
      if (newType.fType == DDL_DOUBLE)
        return true;

      break;

    case (CalpontSystemCatalog::UDOUBLE):
      if (newType.fType == DDL_UNSIGNED_DOUBLE)
        return true;

      break;

    case (CalpontSystemCatalog::DATETIME):
      if (newType.fType == DDL_DATETIME)
        return true;

      break;

    case (CalpontSystemCatalog::TIMESTAMP):
      if (newType.fType == DDL_TIMESTAMP)
        return true;

      break;

    case (CalpontSystemCatalog::TIME):
      if (newType.fType == DDL_TIME)
        return true;

      break;

    case (CalpontSystemCatalog::VARCHAR):
      if (newType.fType == DDL_VARCHAR && checkTextTypesLengthAreEqual(colType, newType))
        return true;

      break;

    case (CalpontSystemCatalog::VARBINARY):
      if (newType.fType == DDL_VARBINARY && colType.colWidth == newType.fLength)
        return true;

      break;

    case (CalpontSystemCatalog::CLOB): break;

    case (CalpontSystemCatalog::BLOB):
      if (newType.fType == DDL_BLOB && colType.colWidth == newType.fLength)
        return true;

      break;

    case (CalpontSystemCatalog::TEXT):
      if (newType.fType == DDL_TEXT && colType.colWidth == newType.fLength)
        return true;

      break;

    default: break;
  }

  return false;
}

bool comptypesAreCompat(int oldCtype, int newCtype)
{
  switch (oldCtype)
  {
    case 1:
    case 2: return (newCtype == 1 || newCtype == 2);

    default: break;
  }

  return (oldCtype == newCtype);
}

}  // namespace

namespace ddlpackageprocessor
{
AlterTableProcessor::DDLResult AlterTableProcessor::processPackageInternal(
    ddlpackage::SqlStatement* sqlTableStmt)
{
  SUMMARY_INFO("AlterTableProcessor::processPackage");

  DDLResult result;
  BRM::TxnID txnID;
  txnID.id = fTxnid.id;
  txnID.valid = fTxnid.valid;
  result.result = NO_ERROR;
  std::string err;
  uint64_t tableLockId = 0;

  auto* alterTableStmt = dynamic_cast<AlterTableStatement*>(sqlTableStmt);
  if (!alterTableStmt)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("AlterTableStatement wrong cast");
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }

  DETAIL_INFO(alterTableStmt);
  int rc = 0;
  rc = fDbrm->isReadWrite();

  if (rc != 0)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("Unable to execute the statement due to DBRM is read only");
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }

  //@Bug 4538. Log the sql statement before grabbing tablelock
  string stmt = alterTableStmt->fSql + "|" + (alterTableStmt->fTableName)->fSchema + "|";
  SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt->fSessionID, txnID.id);

  VERBOSE_INFO("Getting current txnID");
  OamCache* oamcache = OamCache::makeOamCache();
  std::vector<int> moduleIds = oamcache->getModuleIds();
  uint64_t uniqueId = 0;

  // Bug 5070. Added exception handling
  try
  {
    uniqueId = fDbrm->getUnique64();
  }
  catch (std::exception& ex)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add(ex.what());
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }
  catch (...)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("Unknown error occurred while getting unique number.");
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }

  fWEClient->addQueue(uniqueId);

  try
  {
    // check table lock
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
        CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt->fSessionID);
    systemCatalogPtr->identity(CalpontSystemCatalog::EC);
    systemCatalogPtr->sessionID(alterTableStmt->fSessionID);
    CalpontSystemCatalog::TableName tableName;
    tableName.schema = (alterTableStmt->fTableName)->fSchema;
    tableName.table = (alterTableStmt->fTableName)->fName;
    execplan::CalpontSystemCatalog::ROPair roPair;
    roPair = systemCatalogPtr->tableRID(tableName);

    uint32_t processID = ::getpid();
    int32_t txnid = txnID.id;
    int32_t sessionId = alterTableStmt->fSessionID;
    std::string processName("DDLProc");
    int i = 0;

    std::vector<uint32_t> pms;

    for (unsigned i = 0; i < moduleIds.size(); i++)
    {
      pms.push_back((uint32_t)moduleIds[i]);
    }

    try
    {
      tableLockId =
          fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING);
    }
    catch (std::exception&)
    {
      throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
    }

    if (tableLockId == 0)
    {
      int waitPeriod = 10;
      int sleepTime = 100;  // sleep 100 milliseconds between checks
      int numTries = 10;    // try 10 times per second
      waitPeriod = Config::getWaitPeriod();
      numTries = waitPeriod * 10;
      struct timespec rm_ts;

      rm_ts.tv_sec = sleepTime / 1000;
      rm_ts.tv_nsec = sleepTime % 1000 * 1000000;

      for (; i < numTries; i++)
      {
        struct timespec abs_ts;

        do
        {
          abs_ts.tv_sec = rm_ts.tv_sec;
          abs_ts.tv_nsec = rm_ts.tv_nsec;
        } while (nanosleep(&abs_ts, &rm_ts) < 0);

        try
        {
          processID = ::getpid();
          txnid = txnID.id;
          sessionId = alterTableStmt->fSessionID;
          ;
          processName = "DDLProc";
          tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid,
                                            BRM::LOADING);
        }
        catch (std::exception&)
        {
          throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
        }

        if (tableLockId > 0)
          break;
      }

      if (i >= numTries)  // error out
      {
        logging::Message::Args args;
        string strOp("alter");
        args.add(strOp);
        args.add(processName);
        args.add((uint64_t)processID);
        args.add(sessionId);
        throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
      }
    }

    ddlpackage::AlterTableActionList actionList = alterTableStmt->fActions;
    AlterTableActionList::const_iterator action_iterator = actionList.begin();

    while (action_iterator != actionList.end())
    {
      std::string s(typeid(*(*action_iterator)).name());

      if (s.find(AlterActionString[0]) != string::npos)
      {
        // bug 827:change AtaAddColumn to AtaAddColumns
        // Add a column
        // Bug 1192
        ddlpackage::ColumnDef* columnDefPtr = 0;
        ddlpackage::AtaAddColumn* addColumnPtr = dynamic_cast<AtaAddColumn*>(*action_iterator);

        if (addColumnPtr)
        {
          columnDefPtr = addColumnPtr->fColumnDef;
        }
        else
        {
          ddlpackage::AtaAddColumns& addColumns = *(dynamic_cast<AtaAddColumns*>(*action_iterator));
          columnDefPtr = addColumns.fColumns[0];
        }

        addColumn(alterTableStmt->fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt->fTableName),
                  uniqueId);

        if (result.result != NO_ERROR)
        {
          err = "AlterTable: add column failed";
          throw std::runtime_error(err);
        }
      }
      else if (s.find(AlterActionString[6]) != string::npos)
      {
        // Drop Column Default
        dropColumnDefault(alterTableStmt->fSessionID, txnID.id, result,
                          *(dynamic_cast<AtaDropColumnDefault*>(*action_iterator)),
                          *(alterTableStmt->fTableName), uniqueId);

        if (result.result != NO_ERROR)
        {
          err = "AlterTable: drop column default failed";
          throw std::runtime_error(err);
        }
      }
      else if (s.find(AlterActionString[3]) != string::npos)
      {
        // Drop Columns
        dropColumns(alterTableStmt->fSessionID, txnID.id, result,
                    *(dynamic_cast<AtaDropColumns*>(*action_iterator)), *(alterTableStmt->fTableName),
                    uniqueId);
      }
      else if (s.find(AlterActionString[2]) != string::npos)
      {
        // Drop a column
        dropColumn(alterTableStmt->fSessionID, txnID.id, result,
                   *(dynamic_cast<AtaDropColumn*>(*action_iterator)), *(alterTableStmt->fTableName),
                   uniqueId);
      }

#if 0
            else if (s.find(AlterActionString[4]) != string::npos)
            {
                //Add Table Constraint
                addTableConstraint (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast<AtaAddTableConstraint*> (*action_iterator)), *(alterTableStmt.fTableName));

            }

#endif
      else if (s.find(AlterActionString[5]) != string::npos)
      {
        // Set Column Default
        setColumnDefault(alterTableStmt->fSessionID, txnID.id, result,
                         *(dynamic_cast<AtaSetColumnDefault*>(*action_iterator)),
                         *(alterTableStmt->fTableName), uniqueId);
      }

#if 0
            else if (s.find(AlterActionString[7]) != string::npos)
            {
                //Drop Table Constraint
                dropTableConstraint (alterTableStmt.fSessionID, txnID.id, result, *(dynamic_cast<AtaDropTableConstraint*> (*action_iterator)), *(alterTableStmt.fTableName));

            }

#endif

      else if (s.find(AlterActionString[8]) != string::npos)
      {
        // Rename Table
        renameTable(alterTableStmt->fSessionID, txnID.id, result,
                    *(dynamic_cast<AtaRenameTable*>(*action_iterator)), *(alterTableStmt->fTableName),
                    uniqueId);
      }

      else if (s.find(AlterActionString[10]) != string::npos)
      {
        // Rename a Column
        renameColumn(alterTableStmt->fSessionID, txnID.id, result,
                     *(dynamic_cast<AtaRenameColumn*>(*action_iterator)), *(alterTableStmt->fTableName),
                     uniqueId);
      }
      else if (s.find(AlterActionString[11]) != string::npos)
      {
        // Table Comment
        tableComment(alterTableStmt->fSessionID, txnID.id, result,
                     *(dynamic_cast<AtaTableComment*>(*action_iterator)), *(alterTableStmt->fTableName),
                     uniqueId);
      }
      else
      {
        throw std::runtime_error("Altertable: Error in the action type");
      }

      ++action_iterator;
    }

    // Log the DDL statement.
    logging::logDDL(alterTableStmt->fSessionID, txnID.id, alterTableStmt->fSql, alterTableStmt->fOwner);

    DETAIL_INFO("Commiting transaction");
    commitTransaction(uniqueId, txnID);
    fSessionManager.committed(txnID);
  }
  catch (std::exception& ex)
  {
    if (checkPPLostConnection(ex.what()))
    {
      if (tableLockId)
      {
        try
        {
          (void)fDbrm->releaseTableLock(tableLockId);
        }
        catch (std::exception&)
        {
          if (result.result == NO_ERROR)
          {
            logging::Message::Args args;
            logging::Message message(1);
            args.add("Table lock is not released due to ");
            args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
            args.add("");
            args.add("");
            message.format(args);
            result.result = ALTER_ERROR;
            result.message = message;
            return result;
          }
        }
      }
      result.result = PP_LOST_CONNECTION;
      fWEClient->removeQueue(uniqueId);
      return result;
    }
    else
    {
      rollBackAlter(ex.what(), txnID, alterTableStmt->fSessionID, result, uniqueId);
    }
  }
  catch (...)
  {
    rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt->fSessionID, result, uniqueId);
  }

  // release table lock
  try
  {
    (void)fDbrm->releaseTableLock(tableLockId);
    // cout << "table lock " << tableLockId << " is released" << endl;
  }
  catch (std::exception&)
  {
    if (result.result == NO_ERROR)
    {
      logging::Message::Args args;
      logging::Message message(1);
      args.add("Table lock is not released due to ");
      args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
      args.add("");
      args.add("");
      message.format(args);
      result.result = ALTER_ERROR;
      result.message = message;
    }
  }

  fWEClient->removeQueue(uniqueId);
  return result;
}

void AlterTableProcessor::rollBackAlter(const string& error, BRM::TxnID txnID, int sessionId,
                                        DDLResult& result, uint64_t uniqueId)
{
  DETAIL_INFO("Rolling back transaction");
  cerr << "AltertableProcessor::processPackage: " << error << endl;

  logging::Message::Args args;
  logging::Message message(1);
  args.add("Alter table Failed: ");
  args.add(error);
  args.add("");
  args.add("");
  message.format(args);

  rollBackTransaction(uniqueId, txnID, sessionId);
  fSessionManager.rolledback(txnID);
  result.result = ALTER_ERROR;
  result.message = message;
}

void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                    DDLResult& result, ddlpackage::ColumnDef* columnDefPtr,
                                    ddlpackage::QualifiedName& inTableName, const uint64_t uniqueId)
{
  std::string err("AlterTableProcessor::addColumn ");
  SUMMARY_INFO(err);
  // Allocate an object ID for the column we are about to create, non systables only
  VERBOSE_INFO("Allocating object ID for a column");
  ByteStream bs;
  ByteStream::byte tmp8;
  int rc = 0;
  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  bool isDict = false;
  //@Bug 4111. Check whether the column exists in calpont systable
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::FE);
  CalpontSystemCatalog::TableColName tableColName;
  tableColName.schema = inTableName.fSchema;
  tableColName.table = inTableName.fName;
  tableColName.column = columnDefPtr->fName;
  CalpontSystemCatalog::OID columnOid;

  try
  {
    columnOid = systemCatalogPtr->lookupOID(tableColName);
  }
  catch (std::exception& ex)
  {
    result.result = ALTER_ERROR;
    err += ex.what();
    throw std::runtime_error(err);
  }
  catch (...)
  {
    result.result = ALTER_ERROR;
    err += "Unknown exception caught";
    throw std::runtime_error(err);
  }

  if (columnOid > 0)  // Column exists already
  {
    err = err + "Internal add column error for " + tableColName.schema + "." + tableColName.table + "." +
          tableColName.column + ". Column exists already. Your table is probably out-of-sync";

    throw std::runtime_error(err);
  }

  int dataType = WriteEngine::convertDataType(columnDefPtr->fType->fType);

  if ((dataType == CalpontSystemCatalog::CHAR && columnDefPtr->fType->fLength > 8) ||
      (dataType == CalpontSystemCatalog::VARCHAR && columnDefPtr->fType->fLength > 7) ||
      (dataType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7) ||
      (dataType == CalpontSystemCatalog::TEXT) || (dataType == CalpontSystemCatalog::BLOB))
  {
    isDict = true;
  }

  // Find out where syscolumn are
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot ");

  int pmNum = 1;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  // Will create files on each PM as needed.
  // BUG931
  // In order to fill up the new column with data an existing column is selected as reference
  ColumnList columns;
  getColumnsForTable(sessionID, inTableName.fSchema, inTableName.fName, columns);
  ColumnList::const_iterator column_iterator;
  column_iterator = columns.begin();

  if (inTableName.fSchema != CALPONT_SCHEMA)
  {
    try
    {
      execplan::ObjectIDManager fObjectIDManager;

      if (isDict)
      {
        fStartingColOID = fObjectIDManager.allocOIDs(2);
      }
      else
        fStartingColOID = fObjectIDManager.allocOIDs(1);
    }
    catch (std::exception& ex)
    {
      result.result = ALTER_ERROR;
      err += ex.what();
      throw std::runtime_error(err);
    }

    // cout << "new oid is " << fStartingColOID << endl;
  }
  else
  {
    // Add columns to SYSTABLE and SYSCOLUMN
    if ((inTableName.fName == SYSTABLE_TABLE) && (columnDefPtr->fName == AUTOINC_COL))
    {
      fStartingColOID = OID_SYSTABLE_AUTOINCREMENT;
    }
    else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == COMPRESSIONTYPE_COL))
    {
      fStartingColOID = OID_SYSCOLUMN_COMPRESSIONTYPE;
    }
    else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == NEXTVALUE_COL))
    {
      fStartingColOID = OID_SYSCOLUMN_NEXTVALUE;
    }
    else
    {
      throw std::runtime_error("Error adding column to calpontsys table");
    }

    columnDefPtr->fType->fCompressiontype = 0;
    columnDefPtr->fType->fAutoincrement = "n";
    columnDefPtr->fType->fNextvalue = 0;
    cerr << "updating calpontsys...using static OID " << fStartingColOID << endl;
  }

  fColumnNum = 1;
  // Find the position for the last column
  //@Bug 1358
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = inTableName.fSchema;
  tableName.table = inTableName.fName;
  std::set<BRM::LogicalPartition> outOfSerPar;
  CalpontSystemCatalog::ROPair ropair;
  bool autoincrement = false;

  try
  {
    ropair = systemCatalogPtr->tableRID(tableName);

    if (ropair.objnum < 0)
    {
      err = "No such table: " + tableName.table;
      throw std::runtime_error(err);
    }

    int totalColumns = systemCatalogPtr->colNumbers(tableName);
    ColumnDefList aColumnList;
    aColumnList.push_back(columnDefPtr);
    bool alterFlag = true;

    // MCOL-66 The DBRM can't handle concurrent DDL
    boost::mutex::scoped_lock lk(dbrmMutex);

    if (inTableName.fSchema != CALPONT_SCHEMA)
    {
      VERBOSE_INFO("Writing meta data to SYSCOL");  // send to WES to process
      bs.restart();
      bs << (ByteStream::byte)WE_SVR_WRITE_SYSCOLUMN;
      bs << uniqueId;
      bs << sessionID;
      bs << (uint32_t)txnID;
      bs << inTableName.fSchema;
      bs << inTableName.fName;
      bs << (uint32_t)fStartingColOID;

      if (isDict)
        bs << (uint32_t)(fStartingColOID + 1);
      else
        bs << (uint32_t)0;

      bs << (uint8_t)alterFlag;
      bs << (uint32_t)totalColumns;
      columnDefPtr->serialize(bs);

      // send to WES to process
      try
      {
        fWEClient->write(bs, (uint32_t)pmNum);

        while (1)
        {
          bsIn.reset(new ByteStream());
          fWEClient->read(uniqueId, bsIn);

          if (bsIn->length() == 0)  // read error
          {
            rc = NETWORK_ERROR;
            errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
            break;
          }
          else
          {
            *bsIn >> tmp8;
            rc = tmp8;

            if (rc != 0)
            {
              *bsIn >> errorMsg;
            }

            break;
          }
        }
      }
      catch (runtime_error& ex)  // write error
      {
        rc = NETWORK_ERROR;
        errorMsg = ex.what();
      }
      catch (...)
      {
        rc = NETWORK_ERROR;
        errorMsg = " Unknown exception caught while updating SYSTABLE.";
      }

      if (rc != 0)
        throw std::runtime_error(errorMsg);
    }

    if ((columnDefPtr->fType->fAutoincrement).compare("y") == 0)
    {
      // update systable autoincrement column
      sysOid = 1001;
      // Find out where systable is
      rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

      if (rc != 0)
        throw std::runtime_error("Error while calling getSysCatDBRoot ");

      pmNum = (*dbRootPMMap)[dbRoot];
      bs.restart();
      bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
      bs << uniqueId;
      bs << sessionID;
      bs << (uint32_t)txnID;
      bs << inTableName.fSchema;
      bs << inTableName.fName;
      bs << (uint32_t)1;

      // send to WES to process
      try
      {
        fWEClient->write(bs, (uint32_t)pmNum);

        while (1)
        {
          bsIn.reset(new ByteStream());
          fWEClient->read(uniqueId, bsIn);

          if (bsIn->length() == 0)  // read error
          {
            rc = NETWORK_ERROR;
            errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
            break;
          }
          else
          {
            *bsIn >> tmp8;
            rc = tmp8;

            if (rc != 0)
            {
              *bsIn >> errorMsg;
            }

            break;
          }
        }
      }
      catch (runtime_error& ex)  // write error
      {
        rc = NETWORK_ERROR;
        errorMsg = ex.what();
      }
      catch (...)
      {
        rc = NETWORK_ERROR;
        errorMsg = " Unknown exception caught while updating SYSTABLE.";
      }

      if (rc != 0)
        throw std::runtime_error(errorMsg);

      // start a sequence in controller
      fDbrm->startAISequence(fStartingColOID, columnDefPtr->fType->fNextvalue, columnDefPtr->fType->fLength,
                             convertDataType(columnDefPtr->fType->fType));
    }

    //@Bug 4176. save oids to a log file for cleanup after fail over.
    std::vector<CalpontSystemCatalog::OID> oidList;

    if (isDict)
    {
      oidList.push_back(fStartingColOID);
      oidList.push_back(fStartingColOID + 1);
    }
    else
      oidList.push_back(fStartingColOID);

    createWriteDropLogFile(ropair.objnum, uniqueId, oidList);

    //@Bug 1358,1427 Always use the first column in the table, not the first one in columnlist to prevent
    // random result
    //@Bug 4182. Use widest column as reference column

    // Find the widest column
    unsigned int colpos = 0;
    int maxColwidth = 0;

    for (colpos = 0; colpos < columns.size(); colpos++)
    {
      if (columns[colpos].colType.colWidth > maxColwidth)
        maxColwidth = columns[colpos].colType.colWidth;
    }

    while (column_iterator != columns.end())
    {
      if (column_iterator->colType.colWidth == maxColwidth)
      {
        // If there is atleast one existing column then use that as a reference to initialize the new column
        // rows. get dbroot information

        // fDbrm->getStartExtent((*column_iterator).oid, dbroot, partitionNum, true);

        rc = fDbrm->getOutOfServicePartitions(column_iterator->oid, outOfSerPar);

        if (rc != 0)
        {
          string errorMsg;
          BRM::errString(rc, errorMsg);
          ostringstream oss;
          oss << "getOutOfServicePartitions failed  due to " << errorMsg;
          throw std::runtime_error(oss.str());
        }

        int dataType1;
        dataType1 = convertDataType(columnDefPtr->fType->fType);

        if (dataType1 == CalpontSystemCatalog::DECIMAL || dataType1 == CalpontSystemCatalog::UDECIMAL)
        {
          columnDefPtr->convertDecimal();
        }

        CalpontSystemCatalog::ColDataType dataType = convertDataType(columnDefPtr->fType->fType);

        if ((columnDefPtr->fType->fAutoincrement).compare("y") == 0)
        {
          autoincrement = true;
        }

        // send to all WES to add the new column
        bs.restart();
        bs << (ByteStream::byte)WE_SVR_FILL_COLUMN;
        bs << uniqueId;
        bs << (uint32_t)txnID;
        bs << (uint32_t)fStartingColOID;

        if (isDict)
          bs << (uint32_t)(fStartingColOID + 1);
        else
          bs << (uint32_t)0;

        // new column info
        bs << (ByteStream::byte)dataType;
        bs << (ByteStream::byte)autoincrement;
        bs << (uint32_t)columnDefPtr->fType->fLength;
        bs << (uint32_t)columnDefPtr->fType->fScale;
        bs << (uint32_t)columnDefPtr->fType->fPrecision;
        std::string tmpStr("");

        if (columnDefPtr->fDefaultValue)
        {
          tmpStr = columnDefPtr->fDefaultValue->fValue;
        }

        bs << tmpStr;
        bs << (ByteStream::byte)columnDefPtr->fType->fCompressiontype;
        // ref column info
        bs << (uint32_t)column_iterator->oid;
        bs << (ByteStream::byte)column_iterator->colType.colDataType;
        bs << (uint32_t)column_iterator->colType.colWidth;
        bs << (ByteStream::byte)column_iterator->colType.compressionType;
        messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
        bs << timeZone;
        // cout << "sending command fillcolumn " << endl;
        uint32_t msgRecived = 0;
        fWEClient->write_to_all(bs);
        bsIn.reset(new ByteStream());

        while (1)
        {
          if (msgRecived == fPMCount)
            break;

          fWEClient->read(uniqueId, bsIn);

          if (bsIn->length() == 0)  // read error
          {
            rc = NETWORK_ERROR;
            break;
          }
          else
          {
            *bsIn >> tmp8;
            *bsIn >> errorMsg;
            rc = tmp8;

            // cout << "Got error code from WES " << rc << endl;
            if (rc != 0)
              break;
            else
              msgRecived++;
          }
        }

        if (rc != 0)  // delete the newly created files before erroring out
        {
          bs.restart();
          bs << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
          bs << uniqueId;
          bs << (uint32_t)oidList.size();

          for (uint32_t i = 0; i < oidList.size(); i++)
          {
            bs << (uint32_t)oidList[i];
          }

          uint32_t msgRecived = 0;

          try
          {
            fWEClient->write_to_all(bs);
            bsIn.reset(new ByteStream());
            ByteStream::byte tmp8;

            while (1)
            {
              if (msgRecived == fWEClient->getPmCount())
                break;

              fWEClient->read(uniqueId, bsIn);

              if (bsIn->length() == 0)  // read error
              {
                rc = NETWORK_ERROR;
                errorMsg = "Lost connection to Write Engine Server while dropping column files";
                break;
              }
              else
              {
                *bsIn >> tmp8;
                rc = tmp8;

                if (rc != 0)
                {
                  *bsIn >> errorMsg;
                  break;
                }
                else
                  msgRecived++;
              }
            }
          }
          catch (runtime_error& ex)  // write error
          {
            rc = NETWORK_ERROR;
            errorMsg = ex.what();
          }
          catch (...)
          {
            rc = NETWORK_ERROR;
            errorMsg = " Unknown exception caught while dropping column files.";
          }

          if (rc == 0)
          {
            fWEClient->removeQueue(uniqueId);
            deleteLogFile(DROPTABLE_LOG, ropair.objnum, uniqueId);
            fWEClient->addQueue(uniqueId);
          }

          throw std::runtime_error(errorMsg);
        }

        // Update nextVal
        break;
      }

      // Update nextVal
      column_iterator++;
    }
  }
  catch (std::exception& ex)
  {
    if (result.result != CREATE_ERROR)
      result.result = ALTER_ERROR;

    err += ex.what();
    throw std::runtime_error(err);
  }
  catch (...)
  {
    result.result = ALTER_ERROR;
    err += "Unknown exception caught";
    throw std::runtime_error(err);
  }

  // update SYSCOLUMN of the new next value
  if (autoincrement)
  {
    DBRM aDbrm;
    aDbrm.getAILock(fStartingColOID);
    uint64_t nextValInController;
    bool validNextVal = aDbrm.getAIValue(fStartingColOID, &nextValInController);

    if (validNextVal)
    {
      WE_DDLCommandClient ddlClient;
      uint8_t rc = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
        rc = ddlClient.UpdateSyscolumnNextval(fStartingColOID, nextValInController, txnID);
      else
        rc = ddlClient.UpdateSyscolumnNextval(fStartingColOID, nextValInController, sessionID);

      aDbrm.releaseAILock(fStartingColOID);

      if (rc != 0)
        throw std::runtime_error("Update SYSCAT next value failed");
    }
    else
    {
      aDbrm.releaseAILock(fStartingColOID);
      throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT));
    }
  }

  std::vector<CalpontSystemCatalog::OID> oidList;
  oidList.push_back(fStartingColOID);

  if (outOfSerPar.size() > 0)
    rc = fDbrm->markPartitionForDeletion(oidList, outOfSerPar, errorMsg);

  if (rc != 0)
  {
    ostringstream oss;
    oss << "Mark partition for deletition failed  due to " << errorMsg;
    throw std::runtime_error(oss.str());
  }

  fWEClient->removeQueue(uniqueId);
  deleteLogFile(DROPTABLE_LOG, ropair.objnum, uniqueId);
  fWEClient->addQueue(uniqueId);
}

void AlterTableProcessor::dropColumn(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                     DDLResult& result, ddlpackage::AtaDropColumn& ataDropColumn,
                                     ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  // 1. Get the OIDs for the column
  // 2. Get the OIDs for the dictionary
  // 3. Remove the column from SYSCOLUMN
  // 4. update systable if the dropped column is autoincrement column
  // 5. update column position for affected columns
  // 6. Remove the files

  SUMMARY_INFO("AlterTableProcessor::dropColumn");
  VERBOSE_INFO("Finding object IDs for the column");
  CalpontSystemCatalog::TableColName tableColName;
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = fTableName.fSchema;
  tableName.table = fTableName.fName;
  tableColName.schema = fTableName.fSchema;
  tableColName.table = fTableName.fName;
  tableColName.column = ataDropColumn.fColumnName;
  execplan::CalpontSystemCatalog::DictOIDList dictOIDList;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  //@Bug 1358
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  std::string err;
  execplan::CalpontSystemCatalog::ROPair roPair;
  CalpontSystemCatalog::OID oid;
  CalpontSystemCatalog::ColType colType;

  try
  {
    roPair = systemCatalogPtr->tableRID(tableName);

    oid = systemCatalogPtr->lookupOID(tableColName);
    colType = systemCatalogPtr->colType(oid);
  }
  catch (std::exception& ex)
  {
    throw std::runtime_error(ex.what());
  }

  int colPos = colType.colPosition;
  ByteStream bytestream;
  bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN_ROW;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID;
  bytestream << fTableName.fSchema;
  bytestream << fTableName.fName;
  bytestream << ataDropColumn.fColumnName;

  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  ByteStream::byte rc = 0;
  // Find out where syscolumn is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot ");

  int pmNum = 1;

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  // MCOL-66 The DBRM can't handle concurrent DDL
  boost::mutex::scoped_lock lk(dbrmMutex);

  try
  {
    fWEClient->write(bytestream, (uint32_t)pmNum);
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column sending WE_SVR_DELETE_SYSCOLUMN_ROW to pm " << pmNum << endl;
#endif

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;
        *bsIn >> errorMsg;
        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column got exception" << ex.what() << endl;
#endif
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column got unknown exception" << endl;
#endif
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);

  // Update SYSTABLE
  if (colType.autoincrement)
  {
    sysOid = 1001;
    // Find out where systable is
    rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

    if (rc != 0)
      throw std::runtime_error("Error while calling getSysCatDBRoot ");

    pmNum = (*dbRootPMMap)[dbRoot];
    bytestream.restart();
    bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
    bytestream << uniqueId;
    bytestream << sessionID;
    bytestream << (uint32_t)txnID;
    bytestream << fTableName.fSchema;
    bytestream << fTableName.fName;
    bytestream << (uint32_t)0;  // autoincrement off

    try
    {
      fWEClient->write(bytestream, (uint32_t)pmNum);
#ifdef IDB_DDL_DEBUG
      cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl;
#endif

      while (1)
      {
        bsIn.reset(new ByteStream());
        fWEClient->read(uniqueId, bsIn);

        if (bsIn->length() == 0)  // read error
        {
          rc = NETWORK_ERROR;
          errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
          break;
        }
        else
        {
          *bsIn >> rc;
          *bsIn >> errorMsg;
          break;
        }
      }
    }
    catch (runtime_error& ex)  // write error
    {
#ifdef IDB_DDL_DEBUG
      cout << "Alter table drop column got exception" << ex.what() << endl;
#endif
      rc = NETWORK_ERROR;
      errorMsg = ex.what();
    }
    catch (...)
    {
      rc = NETWORK_ERROR;
      errorMsg = " Unknown exception caught while updating SYSTABLE.";
#ifdef IDB_DDL_DEBUG
      cout << "Alter table drop column got unknown exception" << endl;
#endif
    }

    if (rc != 0)
      throw std::runtime_error(errorMsg);
  }

  // Update column position
  bytestream.restart();
  bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_COLPOS;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID;
  bytestream << fTableName.fSchema;
  bytestream << fTableName.fName;
  bytestream << (uint32_t)colPos;
  sysOid = 1021;
  // Find out where syscolumn is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot ");

  pmNum = (*dbRootPMMap)[dbRoot];

  try
  {
    fWEClient->write(bytestream, (uint32_t)pmNum);
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl;
#endif

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;
        *bsIn >> errorMsg;
        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column got exception" << ex.what() << endl;
#endif
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column got unknown exception" << endl;
#endif
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);

  // commit the transaction.
  BRM::TxnID aTxnID;
  aTxnID.id = txnID;
  aTxnID.valid = true;
  commitTransaction(uniqueId, aTxnID);

  // Bug 4208 Drop the PrimProcFDCache before droping the column files
  // FOr Windows, this ensures (most likely) that the column files have
  // no open handles to hinder the deletion of the files.
  rc = cacheutils::dropPrimProcFdCache();

  VERBOSE_INFO("Removing column files");
  // Drop files
  std::vector<CalpontSystemCatalog::OID> oidList;
  bytestream.restart();
  bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
  bytestream << uniqueId;

  if (colType.ddn.dictOID > 3000)  //@bug 4847. need to take care varchar(8)
  {
    bytestream << (uint32_t)2;
    bytestream << (uint32_t)oid;
    bytestream << (uint32_t)colType.ddn.dictOID;
    oidList.push_back(oid);
    oidList.push_back(colType.ddn.dictOID);
  }
  else
  {
    bytestream << (uint32_t)1;
    bytestream << (uint32_t)oid;
    oidList.push_back(oid);
  }

  // Save the oids to a file
  uint32_t msgRecived = 0;
  bool fileDropped = true;

  try
  {
    createWriteDropLogFile(roPair.objnum, uniqueId, oidList);
    //@Bug 4811. Need to send to all PMs
    fWEClient->write_to_all(bytestream);
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column sending WE_SVR_UPDATE_SYSTABLE_AUTO to pm " << pmNum << endl;
#endif
    bsIn.reset(new ByteStream());
    ByteStream::byte tmp8;

    while (1)
    {
      if (msgRecived == fWEClient->getPmCount())
        break;

      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while dropping column files";
        break;
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;

        if (rc != 0)
        {
          *bsIn >> errorMsg;
          fileDropped = false;
          break;
        }
        else
          msgRecived++;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
#ifdef IDB_DDL_DEBUG
    cout << "Alter table drop column got exception" << ex.what() << endl;
#endif
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while dropping column files.";
#ifdef IDB_DDL_DEBUG
    cout << "create table got unknown exception" << endl;
#endif
  }

  //@Bug 3860
  rc = cacheutils::dropPrimProcFdCache();
  // Flush primProc cache
  rc = cacheutils::flushOIDsFromCache(oidList);
  // Delete extents from extent map
  rc = fDbrm->deleteOIDs(oidList);

  if (fileDropped)
  {
    fWEClient->removeQueue(uniqueId);
    deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId);
    fWEClient->addQueue(uniqueId);
  }
}

void AlterTableProcessor::dropColumns(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                      DDLResult& result, ddlpackage::AtaDropColumns& ataDropColumns,
                                      ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  SUMMARY_INFO("AlterTableProcessor::dropColumns");
  ddlpackage::ColumnNameList colList = ataDropColumns.fColumns;
  ddlpackage::ColumnNameList::const_iterator col_iter = colList.begin();

  std::string err;

  try
  {
    while (col_iter != colList.end())
    {
      ddlpackage::AtaDropColumn ataDropColumn;

      ataDropColumn.fColumnName = *col_iter;

      dropColumn(sessionID, txnID, result, ataDropColumn, fTableName, uniqueId);

      if (result.result != NO_ERROR)
      {
        DETAIL_INFO("dropColumns::dropColumn failed");
        return;
      }

      col_iter++;
    }
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    throw std::runtime_error(err);
  }
  catch (...)
  {
    err = "dropColumns:Unknown exception caught";
    throw std::runtime_error(err);
  }
}

void AlterTableProcessor::addTableConstraint(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                             DDLResult& result,
                                             ddlpackage::AtaAddTableConstraint& ataAddTableConstraint,
                                             ddlpackage::QualifiedName& fTableName)
{
  /*TODO: Check if existing row satisfy the constraint.
  If not, the constraint will not be added. */
  SUMMARY_INFO("AlterTableProcessor::addTableConstraint");
  ddlpackage::TableConstraintDefList constrainList;
  constrainList.push_back(ataAddTableConstraint.fTableConstraint);
  VERBOSE_INFO("Writing table constraint meta data to SYSCONSTRAINT");
  // bool alterFlag = true;
  std::string err;

  try
  {
    // writeTableSysConstraintMetaData(sessionID, txnID, result, constrainList, fTableName, alterFlag);
    VERBOSE_INFO("Writing table constraint meta data to SYSCONSTRAINTCOL");
    // writeTableSysConstraintColMetaData(sessionID, txnID, result,constrainList, fTableName, alterFlag);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    throw std::runtime_error(err);
  }
  catch (...)
  {
    err = "addTableConstraint:Unknown exception caught";
    throw std::runtime_error(err);
  }
}

void AlterTableProcessor::setColumnDefault(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                           DDLResult& result,
                                           ddlpackage::AtaSetColumnDefault& ataSetColumnDefault,
                                           ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  SUMMARY_INFO("AlterTableProcessor::setColumnDefault");
  /*Steps:
  1. Update SYSCOLUMN for default value change
  */
  SUMMARY_INFO("AlterTableProcessor::setColumnDefault");
  ByteStream bs;
  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  ByteStream::byte rc = 0;
  // Find out where syscolumns
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  int pmNum = 1;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;

  string err;

  // Update SYSCOLUMN
  bs.restart();
  bs << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL;
  bs << uniqueId;
  bs << sessionID;
  bs << (uint32_t)txnID;
  bs << fTableName.fSchema;
  bs << fTableName.fName;
  bs << ataSetColumnDefault.fColumnName;
  string defaultValue("");

  if (ataSetColumnDefault.fDefaultValue)
    defaultValue = ataSetColumnDefault.fDefaultValue->fValue;

  bs << defaultValue;

  // send to WES to process
  try
  {
    fWEClient->write(bs, (uint32_t)pmNum);

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;

        if (rc != 0)
        {
          *bsIn >> errorMsg;
        }

        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);
}
void AlterTableProcessor::dropColumnDefault(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                            DDLResult& result,
                                            ddlpackage::AtaDropColumnDefault& ataDropColumnDefault,
                                            ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  SUMMARY_INFO("AlterTableProcessor::setColumnDefault");
  /*Steps:
  1. Update SYSCOLUMN for default value change
  */
  SUMMARY_INFO("AlterTableProcessor::setColumnDefault");
  ByteStream bs;
  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  ByteStream::byte rc = 0;
  // Find out where syscolumn is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  int pmNum = 1;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;

  string err;

  // Update SYSCOLUMN
  bs.restart();
  bs << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_DEFAULTVAL;
  bs << uniqueId;
  bs << sessionID;
  bs << (uint32_t)txnID;
  bs << fTableName.fSchema;
  bs << fTableName.fName;
  bs << ataDropColumnDefault.fColumnName;
  string defaultValue("");
  bs << defaultValue;

  // send to WES to process
  try
  {
    fWEClient->write(bs, (uint32_t)pmNum);

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;

        if (rc != 0)
        {
          *bsIn >> errorMsg;
        }

        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);
}

#if 0
void AlterTableProcessor::dropTableConstraint (uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, DDLResult& result, ddlpackage::AtaDropTableConstraint& ataDropTableConstraint, ddlpackage::QualifiedName& fTableName)
{
    /*Steps tp drop table constraint
     1. Delete the ConstraintName from SYSCONSTRAINT
     2. Delete the corresponding row from SYSCONSTRAINTCOL
     3. Delete the row from SYSINDEX if PK;
     3. Delete the rows from SYSINDEXCOL if PK;
     */
    SUMMARY_INFO("AlterTableProcessor::dropTableConstraint");
    ddlpackage::QualifiedName sysCatalogTableName;
    sysCatalogTableName.fSchema = CALPONT_SCHEMA;
    sysCatalogTableName.fName  = SYSCONSTRAINT_TABLE;
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
    systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
    //@Bug 1358
    systemCatalogPtr->identity(CalpontSystemCatalog::EC);
    VERBOSE_INFO("Removing constraint meta data from SYSCONSTRAINT");
    std::string err;

    try
    {
        execplan::CalpontSystemCatalog::RID constrainRid = systemCatalogPtr->constraintRID(ataDropTableConstraint.fConstraintName);

        if (constrainRid == std::numeric_limits<CalpontSystemCatalog::RID>::max())
        {
            // build the logging message
            err = "Alter Table failed: Constraint name not found";
            throw std::runtime_error(err);
        }

        removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, constrainRid);

        VERBOSE_INFO("Removing constraint meta data from SYSCONSTRAINTCOL");
        sysCatalogTableName.fName  = SYSCONSTRAINTCOL_TABLE;
        execplan::CalpontSystemCatalog::RIDList ridlist = systemCatalogPtr->constraintColRID(ataDropTableConstraint.fConstraintName);

        if (ridlist.size() == 0)
        {
            // build the logging message
            err = "Alter Table failed: Constraint name not found";
            throw std::runtime_error(err);
        }

        removeRowsFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ridlist);

        execplan::CalpontSystemCatalog::IndexName idxName;
        idxName.schema = fTableName.fSchema;
        idxName.table = fTableName.fName;
        idxName.index = ataDropTableConstraint.fConstraintName;
        execplan::CalpontSystemCatalog::ROPair ropair = systemCatalogPtr->indexRID(idxName);

        if (ropair.rid >= 0)
        {
            sysCatalogTableName.fName  = SYSINDEX_TABLE;
            removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ropair.rid);
        }

        execplan::CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->indexColRIDs(idxName);

        if (ridList.size() > 0)
        {
            execplan::CalpontSystemCatalog::RIDList::const_iterator riditer;
            sysCatalogTableName.fName  = SYSINDEXCOL_TABLE;
            riditer = ridList.begin();

            while (riditer != ridList.end())
            {
                ropair = *riditer;
                removeRowFromSysCatalog(sessionID, txnID, result, sysCatalogTableName, ropair.rid);
                riditer++;
            }
        }
    }
    catch (std::exception& ex)
    {
        err = ex.what();
        throw std::runtime_error(err);
    }
    catch (...)
    {
        err = "dropTableConstraint:Unknown exception caught";
        throw std::runtime_error(err);
    }

}
#endif

void AlterTableProcessor::renameTable(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                      DDLResult& result, ddlpackage::AtaRenameTable& ataRenameTable,
                                      ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  /*Steps:
  1. Update SYSTABLE (table name)
  2. Update SYSCOLUMN (table name)
  */
  SUMMARY_INFO("AlterTableProcessor::renameTable");

  //@Bug 4599. Check whether the new table exists in infinidb
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  execplan::CalpontSystemCatalog::TableName tableName;
  tableName.schema = ataRenameTable.fQualifiedName->fSchema;
  tableName.table = ataRenameTable.fQualifiedName->fName;
  execplan::CalpontSystemCatalog::ROPair roPair;
  roPair.objnum = 0;

  try
  {
    roPair = systemCatalogPtr->tableRID(tableName);
  }
  catch (...)
  {
    roPair.objnum = 0;
  }

  if (roPair.objnum >= 3000)
    throw std::runtime_error("The new tablename is already in use.");

  ByteStream bytestream;
  bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_TABLENAME;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID;
  bytestream << fTableName.fSchema;
  bytestream << fTableName.fName;
  bytestream << ataRenameTable.fQualifiedName->fName;
  bytestream << ataRenameTable.fQualifiedName->fSchema;

  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1001;
  ByteStream::byte rc = 0;
  // Find out where systable is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  int pmNum = 1;

  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  try
  {
    fWEClient->write(bytestream, (uint32_t)pmNum);
#ifdef IDB_DDL_DEBUG
    cout << "Rename table sending WE_SVR_UPDATE_SYSTABLE_TABLENAME to pm " << pmNum << endl;
#endif

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;
        *bsIn >> errorMsg;
        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
#ifdef IDB_DDL_DEBUG
    cout << "create table got exception" << ex.what() << endl;
#endif
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
#ifdef IDB_DDL_DEBUG
    cout << "create table got unknown exception" << endl;
#endif
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);

  // update SYSCOLUMN
  bytestream.restart();
  bytestream << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_TABLENAME;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID;
  bytestream << fTableName.fSchema;
  bytestream << fTableName.fName;
  bytestream << ataRenameTable.fQualifiedName->fName;
  bytestream << ataRenameTable.fQualifiedName->fSchema;
  sysOid = 1021;
  // Find out where syscolumn is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  pmNum = (*dbRootPMMap)[dbRoot];

  try
  {
    fWEClient->write(bytestream, (unsigned)pmNum);
#ifdef IDB_DDL_DEBUG
    cout << "Rename table sending WE_SVR_UPDATE_SYSCOLUMN_TABLENAME to pm " << pmNum << endl;
#endif

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;
        *bsIn >> errorMsg;
        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
#ifdef IDB_DDL_DEBUG
    cout << "create table got exception" << ex.what() << endl;
#endif
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
#ifdef IDB_DDL_DEBUG
    cout << "create table got unknown exception" << endl;
#endif
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);
}

void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                       DDLResult& result, ddlpackage::AtaTableComment& ataTableComment,
                                       ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  // Currently only process autoincrement values in table comments during alter
  SUMMARY_INFO("AlterTableProcessor::tableComment");
  uint64_t nextVal;
  BRM::OID_t sysOid = 1001;
  ByteStream::byte rc = 0;
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  uint16_t dbRoot;
  std::string errorMsg;
  int pmNum = 1;
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

  boost::algorithm::to_upper(ataTableComment.fTableComment);
  std::regex compat("[[:space:]]*AUTOINCREMENT[[:space:]]*=[[:space:]]*", std::regex_constants::extended);
  std::match_results<std::string::const_iterator> what;
  std::string::const_iterator start, end;
  start = ataTableComment.fTableComment.begin();
  end = ataTableComment.fTableComment.end();
  std::regex_constants::match_flag_type flags = std::regex_constants::match_default;

  if (std::regex_search(start, end, what, compat, flags) && what[0].matched)
  {
    std::string params(&(*(what[0].second)));
    char* ep = NULL;
    const char* str = params.c_str();
    errno = 0;
    nextVal = strtoull(str, &ep, 10);

    if ((ep == str) || (*ep != '\0') || (errno != 0))
    {
      throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_INVALID_START_VALUE));
    }

    // Checks if zero and throws appropriate error (despite message name)
    // negative checks are below
    if (nextVal == 0)
    {
      throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_NEGATIVE_STARTVALUE));
    }
  }
  else
  {
    // Generic table comment, we don't need to do anything
    return;
  }

  // Get the OID for autoinc (if exists)
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = fTableName.fSchema;
  tableName.table = fTableName.fName;
  CalpontSystemCatalog::TableInfo tblInfo = systemCatalogPtr->tableInfo(tableName);

  if (tblInfo.tablewithautoincr != 1)
  {
    throw std::runtime_error("Table does not have an autoincrement column");
  }

  int32_t oid = systemCatalogPtr->autoColumOid(tableName);

  CalpontSystemCatalog::ColType type = systemCatalogPtr->colType(oid);

  bool validated = true;
  bool negative = false;

  switch (type.colDataType)
  {
    case CalpontSystemCatalog::BIGINT:
      if (static_cast<int64_t>(nextVal) > MAX_BIGINT)
        validated = false;

      if (static_cast<int64_t>(nextVal) < 1)
        negative = true;

      break;

    case CalpontSystemCatalog::UBIGINT:
      if (nextVal > MAX_UBIGINT)
        validated = false;

      break;

    case CalpontSystemCatalog::INT:
      if (static_cast<int64_t>(nextVal) > MAX_INT)
        validated = false;

      if (static_cast<int64_t>(nextVal) < 1)
        negative = true;

      break;

    case CalpontSystemCatalog::UINT:
      if (nextVal > MAX_UINT)
        validated = false;

      break;

    case CalpontSystemCatalog::MEDINT:
      if (static_cast<int64_t>(nextVal) > MAX_MEDINT)
        validated = false;

      if (static_cast<int64_t>(nextVal) < 1)
        negative = true;

      break;

    case CalpontSystemCatalog::UMEDINT:
      if (nextVal > MAX_UMEDINT)
        validated = false;

      break;

    case CalpontSystemCatalog::SMALLINT:
      if (static_cast<int64_t>(nextVal) > MAX_SMALLINT)
        validated = false;

      if (static_cast<int64_t>(nextVal) < 1)
        negative = true;

      break;

    case CalpontSystemCatalog::USMALLINT:
      if (nextVal > MAX_USMALLINT)
        validated = false;

      break;

    case CalpontSystemCatalog::TINYINT:
      if (static_cast<int64_t>(nextVal) > MAX_TINYINT)
        validated = false;

      if (static_cast<int64_t>(nextVal) < 1)
        negative = true;

      break;

    case CalpontSystemCatalog::UTINYINT:
      if (nextVal > MAX_UTINYINT)
        validated = false;

      break;

    default: break;
  }

  if (!validated)
  {
    throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_INVALID_START_VALUE));
  }

  if (negative)
  {
    throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_NEGATIVE_STARTVALUE));
  }

  fDbrm->resetAISequence(oid, nextVal);
  ByteStream bs;
  bs.restart();
  bs << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_AUTOVAL;
  bs << uniqueId;
  bs << oid;
  bs << nextVal;
  bs << sessionID;

  try
  {
    fWEClient->write(bs, (uint32_t)pmNum);

    while (1)
    {
      bsIn.reset(new ByteStream());
      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
        break;
      }
      else
      {
        *bsIn >> rc;
        *bsIn >> errorMsg;
        break;
      }
    }
  }
  catch (runtime_error& ex)  // write error
  {
    rc = NETWORK_ERROR;
    errorMsg = ex.what();
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = " Unknown exception caught while updating SYSTABLE.";
  }

  if (rc != 0)
    throw std::runtime_error(errorMsg);
}

void AlterTableProcessor::renameColumn(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID,
                                       DDLResult& result, ddlpackage::AtaRenameColumn& ataRenameColumn,
                                       ddlpackage::QualifiedName& fTableName, const uint64_t uniqueId)
{
  /*Steps:
  1. Update SYSCOLUMN for name, autoincrement, nextval change
  2. Update SYSTABLE if column is autoincrement column
  */
  SUMMARY_INFO("AlterTableProcessor::renameColumn");
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

  ByteStream bs;
  std::string errorMsg;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1001;
  ByteStream::byte rc = 0;
  // Find out where systable is
  rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

  if (rc != 0)
    throw std::runtime_error("Error while calling getSysCatDBRoot");

  int pmNum = 1;
  OamCache* oamcache = OamCache::makeOamCache();
  boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
  pmNum = (*dbRootPMMap)[dbRoot];
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;

  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::TableColName tableColName;
  tableColName.schema = fTableName.fSchema;
  tableColName.table = fTableName.fName;
  tableColName.column = ataRenameColumn.fName;
  CalpontSystemCatalog::ROPair ropair;

  string err;

  try
  {
    // This gives us the rid in syscolumn that we want to update
    tableName.schema = tableColName.schema;
    tableName.table = tableColName.table;
    ropair = systemCatalogPtr->tableRID(tableName);

    if (ropair.objnum < 0)
    {
      ostringstream oss;
      oss << "No such table: " << tableName;
      throw std::runtime_error(oss.str().c_str());
    }

    ropair = systemCatalogPtr->columnRID(tableColName);

    if (ropair.objnum < 0)
    {
      ostringstream oss;
      oss << "No such column: " << tableColName;
      throw std::runtime_error(oss.str().c_str());
    }

    CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ropair.objnum);

    if (!typesAreSame(colType, *ataRenameColumn.fNewType))
    {
      ostringstream oss;
      oss << "Changing the datatype of a column is not supported";
      throw std::runtime_error(oss.str().c_str());
    }

    //@Bug 3746 Check whether the change is about the compression type
    if (!comptypesAreCompat(colType.compressionType, (*ataRenameColumn.fNewType).fCompressiontype))
    {
      ostringstream oss;
      oss << "The compression type of an existing column cannot be changed.";
      throw std::runtime_error(oss.str().c_str());
    }

    // Check whether SYSTABLE needs to be updated

    CalpontSystemCatalog::TableInfo tblInfo = systemCatalogPtr->tableInfo(tableName);

    if (((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) &&
         (ataRenameColumn.fNewType->fAutoincrement.compare("n") == 0)) ||
        ((tblInfo.tablewithautoincr == 0) && (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0)))
    {
      // update systable autoincrement column
      bs.restart();
      bs << (ByteStream::byte)WE_SVR_UPDATE_SYSTABLE_AUTO;
      bs << uniqueId;
      bs << sessionID;
      bs << (uint32_t)txnID;
      bs << fTableName.fSchema;
      bs << fTableName.fName;

      if (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0)
        bs << (uint32_t)1;
      else
        bs << (uint32_t)0;

      // send to WES to process
      try
      {
        fWEClient->write(bs, (uint32_t)pmNum);

        while (1)
        {
          bsIn.reset(new ByteStream());
          fWEClient->read(uniqueId, bsIn);

          if (bsIn->length() == 0)  // read error
          {
            rc = NETWORK_ERROR;
            errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
            break;
          }
          else
          {
            *bsIn >> rc;

            if (rc != 0)
            {
              *bsIn >> errorMsg;
            }

            break;
          }
        }
      }
      catch (runtime_error& ex)  // write error
      {
        rc = NETWORK_ERROR;
        errorMsg = ex.what();
      }
      catch (...)
      {
        rc = NETWORK_ERROR;
        errorMsg = " Unknown exception caught while updating SYSTABLE.";
      }

      if (rc != 0)
        throw std::runtime_error(errorMsg);

      // change a sequence in controller
      if ((!(tblInfo.tablewithautoincr == 1) || (colType.autoincrement)) &&
          (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0))
      {
        fDbrm->startAISequence(ropair.objnum, ataRenameColumn.fNewType->fNextvalue,
                               ataRenameColumn.fNewType->fLength,
                               convertDataType(ataRenameColumn.fNewType->fType));
        // Reset it in case there is a sequence already
        fDbrm->resetAISequence(ropair.objnum, ataRenameColumn.fNewType->fNextvalue);
      }
    }
    else if ((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) &&
             (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0))
    {
    }
    else
    {
      fDbrm->resetAISequence(ropair.objnum, 0);
    }

    // Update SYSCOLUMN
    bs.restart();
    bs << (ByteStream::byte)WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN;
    bs << uniqueId;
    bs << sessionID;
    bs << (uint32_t)txnID;
    bs << fTableName.fSchema;
    bs << fTableName.fName;
    bs << ataRenameColumn.fName;
    bs << ataRenameColumn.fNewName;
    bs << ataRenameColumn.fNewType->fAutoincrement;
    //@Bug 5913. for autoincrement column, find the next value from SYSCOLUMN
    long long nextVal = ataRenameColumn.fNewType->fNextvalue;

    if ((tblInfo.tablewithautoincr == 1) && (colType.autoincrement) &&
        (ataRenameColumn.fNewType->fAutoincrement.compare("y") == 0))
      nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);

    bs << (uint64_t)nextVal;
    uint32_t nullable = 1;
    string defaultValue("");

    if (ataRenameColumn.fConstraints.size() > 0)
    {
      for (uint32_t j = 0; j < ataRenameColumn.fConstraints.size(); j++)
      {
        if (ataRenameColumn.fConstraints[j]->fConstraintType == DDL_NOT_NULL)
        {
          nullable = 0;
          break;
        }
      }
    }

    bs << nullable;

    if (ataRenameColumn.fDefaultValue)
      defaultValue = ataRenameColumn.fDefaultValue->fValue;

    bs << defaultValue;
    sysOid = 1021;
    // Find out where syscolumn is
    rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);

    if (rc != 0)
      throw std::runtime_error("Error while calling getSysCatDBRoot");

    pmNum = (*dbRootPMMap)[dbRoot];

    // send to WES to process
    try
    {
      fWEClient->write(bs, (uint32_t)pmNum);

      while (1)
      {
        bsIn.reset(new ByteStream());
        fWEClient->read(uniqueId, bsIn);

        if (bsIn->length() == 0)  // read error
        {
          rc = NETWORK_ERROR;
          errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
          break;
        }
        else
        {
          *bsIn >> rc;

          if (rc != 0)
          {
            *bsIn >> errorMsg;
          }

          break;
        }
      }
    }
    catch (runtime_error& ex)  // write error
    {
      rc = NETWORK_ERROR;
      errorMsg = ex.what();
    }
    catch (...)
    {
      rc = NETWORK_ERROR;
      errorMsg = " Unknown exception caught while updating SYSTABLE.";
    }

    if (rc != 0)
      throw std::runtime_error(errorMsg);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    throw std::runtime_error(err);
  }
  catch (...)
  {
    err = "renameColumn:Unknown exception caught";
    throw std::runtime_error(err);
  }
}

}  // namespace ddlpackageprocessor

#ifdef __clang__
#pragma clang diagnostic pop
#endif
